/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.artifact;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkNotNull;



import com.fasterxml.jackson.core.JsonProcessingException;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

import java.io.Serializable;

import java.nio.channels.WritableByteChannel;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;

import java.util.Collections;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.ProxyManifest;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;

import com.bff.gaia.unified.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;

import com.bff.gaia.unified.runners.fnexecution.FnService;

import com.bff.gaia.unified.sdk.io.FileSystems;

import com.bff.gaia.unified.sdk.io.fs.MoveOptions.StandardMoveOptions;

import com.bff.gaia.unified.sdk.io.fs.ResolveOptions.StandardResolveOptions;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.util.MimeTypes;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.com.google.protobuf.util.JsonFormat;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.Status;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;

import com.bff.gaia.unified.vendor.guava.com.google.common.hash.Hasher;

import com.bff.gaia.unified.vendor.guava.com.google.common.hash.Hashing;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



/**

 * This implementation is experimental.

 *

 * <p>{@link ArtifactStagingServiceImplBase} based on unified file system. {@link

 * UnifiedFileSystemArtifactStagingService} requires {@link StagingSessionToken} in every me call. The

 * manifest is put in {@link StagingSessionToken#getBasePath()}/{@link

 * StagingSessionToken#getSessionId()} and artifacts are put in {@link

 * StagingSessionToken#getBasePath()}/{@link StagingSessionToken#getSessionId()}/{@link

 * UnifiedFileSystemArtifactStagingService#ARTIFACTS}.

 *

 * <p>The returned token is the path to the manifest file.

 *

 * <p>The manifest file is encoded in {@link ProxyManifest}.

 */

public class UnifiedFileSystemArtifactStagingService extends ArtifactStagingServiceImplBase

    implements FnService {



  private static final Logger LOG =

      LoggerFactory.getLogger(UnifiedFileSystemArtifactStagingService.class);

  private static final ObjectMapper MAPPER = new ObjectMapper();

  // Use UTF8 for all text encoding.

  private static final Charset CHARSET = StandardCharsets.UTF_8;

  public static final String MANIFEST = "MANIFEST";

  public static final String ARTIFACTS = "artifacts";



  @Override

  public StreamObserver<PutArtifactRequest> putArtifact(

      StreamObserver<PutArtifactResponse> responseObserver) {

    return new PutArtifactStreamObserver(responseObserver);

  }



  @Override

  public void commitManifest(

      CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) {

    try {

      StagingSessionToken stagingSessionToken =

          StagingSessionToken.decode(request.getStagingSessionToken());

      ResourceId manifestResourceId = getManifestFileResourceId(stagingSessionToken);

      ResourceId artifactDirResourceId = getArtifactDirResourceId(stagingSessionToken);

      ProxyManifest.Builder proxyManifestBuilder =

          ProxyManifest.newBuilder().setManifest(request.getManifest());

      for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {

        proxyManifestBuilder.addLocation(

            Location.newBuilder()

                .setName(artifactMetadata.getName())

                .setUri(

                    artifactDirResourceId

                        .resolve(

                            encodedFileName(artifactMetadata), StandardResolveOptions.RESOLVE_FILE)

                        .toString())

                .build());

      }

      try (WritableByteChannel manifestWritableByteChannel =

          FileSystems.create(manifestResourceId, MimeTypes.TEXT)) {

        manifestWritableByteChannel.write(

            CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));

      }

      // TODO: Validate integrity of staged files.

      responseObserver.onNext(

          CommitManifestResponse.newBuilder()

              .setRetrievalToken(manifestResourceId.toString())

              .build());

      responseObserver.onCompleted();

    } catch (Exception e) {

      // TODO: Cleanup all the artifacts.

      LOG.error("Unable to commit manifest.", e);

      responseObserver.onError(e);

    }

  }



  @Override

  public void close() throws Exception {

    // Nothing to close here.

  }



  /**

   * Generate a stagingSessionToken compatible with {@link UnifiedFileSystemArtifactStagingService}.

   *

   * @param sessionId Unique sessionId for artifact staging.

   * @param basePath Base path to upload artifacts.

   * @return Encoded stagingSessionToken.

   */

  public static String generateStagingSessionToken(String sessionId, String basePath) {

    StagingSessionToken stagingSessionToken = new StagingSessionToken();

    stagingSessionToken.setSessionId(sessionId);

    stagingSessionToken.setBasePath(basePath);

    return stagingSessionToken.encode();

  }



  private String encodedFileName(ArtifactMetadata artifactMetadata) {

    return "artifact_"

        + Hashing.sha256().hashString(artifactMetadata.getName(), CHARSET).toString();

  }



  public void removeArtifacts(String stagingSessionToken) throws Exception {

    StagingSessionToken parsedToken = StagingSessionToken.decode(stagingSessionToken);

    ResourceId dir = getJobDirResourceId(parsedToken);

    ResourceId manifestResourceId = dir.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);



    LOG.debug("Removing dir {}", dir);



    ProxyManifest proxyManifest =

        UnifiedFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);

    for (Location location : proxyManifest.getLocationList()) {

      String uri = location.getUri();

      LOG.debug("Removing artifact: {}", uri);

      FileSystems.delete(

          Collections.singletonList(FileSystems.matchNewResource(uri, false /* is directory */)));

    }



    ResourceId artifactsResourceId =

        dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);

    if (!proxyManifest.getLocationList().isEmpty()) {

      // directory only exists when there is at least one artifact

      LOG.debug("Removing artifacts dir: {}", artifactsResourceId);

      FileSystems.delete(Collections.singletonList(artifactsResourceId));

    }

    LOG.debug("Removing manifest: {}", manifestResourceId);

    FileSystems.delete(Collections.singletonList(manifestResourceId));

    LOG.debug("Removing empty dir: {}", dir);

    FileSystems.delete(Collections.singletonList(dir));

    LOG.info("Removed dir {}", dir);

  }



  private ResourceId getJobDirResourceId(StagingSessionToken stagingSessionToken) {

    ResourceId baseResourceId;

    // Get or Create the base path

    baseResourceId =

        FileSystems.matchNewResource(stagingSessionToken.getBasePath(), true /* isDirectory */);

    // Using sessionId as the subDir to store artifacts and manifest.

    return baseResourceId.resolve(

        stagingSessionToken.getSessionId(), StandardResolveOptions.RESOLVE_DIRECTORY);

  }



  private ResourceId getManifestFileResourceId(StagingSessionToken stagingSessionToken) {

    return getJobDirResourceId(stagingSessionToken)

        .resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);

  }



  private ResourceId getArtifactDirResourceId(StagingSessionToken stagingSessionToken) {

    return getJobDirResourceId(stagingSessionToken)

        .resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);

  }



  private class PutArtifactStreamObserver implements StreamObserver<PutArtifactRequest> {



    private final StreamObserver<PutArtifactResponse> outboundObserver;

    private PutArtifactMetadata metadata;

    private ResourceId artifactId;

    private WritableByteChannel artifactWritableByteChannel;

    private Hasher hasher;



    PutArtifactStreamObserver(StreamObserver<PutArtifactResponse> outboundObserver) {

      this.outboundObserver = outboundObserver;

    }



    @Override

    public void onNext(PutArtifactRequest putArtifactRequest) {

      // Create the directory structure for storing artifacts in the first call.

      if (metadata == null) {

        checkNotNull(putArtifactRequest);

        checkNotNull(putArtifactRequest.getMetadata());

        metadata = putArtifactRequest.getMetadata();

        LOG.debug("stored metadata: {}", metadata);

        // Check the base path exists or create the base path

        try {

          ResourceId artifactsDirId =

              getArtifactDirResourceId(

                  StagingSessionToken.decode(

                      putArtifactRequest.getMetadata().getStagingSessionToken()));

          artifactId =

              artifactsDirId.resolve(

                  encodedFileName(metadata.getMetadata()), StandardResolveOptions.RESOLVE_FILE);

          LOG.debug(

              "Going to stage artifact {} to {}.", metadata.getMetadata().getName(), artifactId);

          artifactWritableByteChannel = FileSystems.create(artifactId, MimeTypes.BINARY);

          hasher = Hashing.sha256().newHasher();

        } catch (Exception e) {

          String message =

              String.format(

                  "Failed to begin staging artifact %s", metadata.getMetadata().getName());

          outboundObserver.onError(

              new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));

        }

      } else {

        try {

          ByteString data = putArtifactRequest.getData().getData();

          artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());

          hasher.putBytes(data.toByteArray());

        } catch (IOException e) {

          String message =

              String.format(

                  "Failed to write chunk of artifact %s to %s",

                  metadata.getMetadata().getName(), artifactId);

          outboundObserver.onError(

              new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));

        }

      }

    }



    @Override

    public void onError(Throwable throwable) {

      // Delete the artifact.

      LOG.error("Staging artifact failed for " + artifactId, throwable);

      try {

        if (artifactWritableByteChannel != null) {

          artifactWritableByteChannel.close();

        }

        if (artifactId != null) {

          FileSystems.delete(

              Collections.singletonList(artifactId), StandardMoveOptions.IGNORE_MISSING_FILES);

        }



      } catch (IOException e) {

        outboundObserver.onError(

            new StatusRuntimeException(

                Status.DATA_LOSS.withDescription(

                    String.format("Failed to clean up artifact file %s", artifactId))));

        return;

      }

      outboundObserver.onError(

          new StatusRuntimeException(

              Status.DATA_LOSS

                  .withDescription(String.format("Failed to stage artifact %s", artifactId))

                  .withCause(throwable)));

    }



    @Override

    public void onCompleted() {

      // Close the stream.

      LOG.debug("Staging artifact completed for " + artifactId);

      if (artifactWritableByteChannel != null) {

        try {

          artifactWritableByteChannel.close();

        } catch (IOException e) {

          onError(e);

          return;

        }

      }

      String expectedSha256 = metadata.getMetadata().getSha256();

      if (expectedSha256 != null && !expectedSha256.isEmpty()) {

        String actualSha256 = hasher.hash().toString();

        if (!actualSha256.equals(expectedSha256)) {

          outboundObserver.onError(

              new StatusRuntimeException(

                  Status.INVALID_ARGUMENT.withDescription(

                      String.format(

                          "Artifact %s is corrupt: expected sah256 %s, but has sha256 %s",

                          metadata.getMetadata().getName(), expectedSha256, actualSha256))));

          return;

        }

      }

      outboundObserver.onNext(PutArtifactResponse.newBuilder().build());

      outboundObserver.onCompleted();

    }

  }



  /**

   * Serializable StagingSessionToken used to stage files with {@link

   * UnifiedFileSystemArtifactStagingService}.

   */

  private static class StagingSessionToken implements Serializable {



    private String sessionId;

    private String basePath;



    /** Access is public for json conversion. */

    public String getSessionId() {

      return sessionId;

    }



    private void setSessionId(String sessionId) {

      this.sessionId = sessionId;

    }



    /** Access is public for json conversion. */

    public String getBasePath() {

      return basePath;

    }



    private void setBasePath(String basePath) {

      this.basePath = basePath;

    }



    public String encode() {

      try {

        return MAPPER.writeValueAsString(this);

      } catch (JsonProcessingException e) {

        String message =

            String.format("Error %s occurred while serializing %s", e.getMessage(), this);

        throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));

      }

    }



    public static StagingSessionToken decode(String stagingSessionToken) throws Exception {

      try {

        return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);

      } catch (JsonProcessingException e) {

        String message =

            String.format(

                "Unable to deserialize staging token %s. Expected format: %s. Error: %s",

                stagingSessionToken,

                "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",

                e.getMessage());

        throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));

      }

    }



    @Override

    public String toString() {

      return "StagingSessionToken{"

          + "sessionId='"

          + sessionId

          + "', "

          + "basePath='"

          + basePath

          + "'"

          + "}";

    }

  }

}